package com.atguigu.gmall1118.app

import com.atguigu.gmall1118.bean.TagInfo
import com.atguigu.gmall1118.constant.CodeConst
import com.atguigu.gmall1118.dao.TagInfoDAO
import com.atguigu.gmall1118.util.ClickhouseUtil
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer

object TaskBitmapApp {


  //取得标签列表
  //
  //
  //根据标签值类型的不同 插入到不同的表中
  //
  //转储本质就是一条insert into select操作
  def main(args: Array[String]): Unit = {


    val sparkConf: SparkConf = new SparkConf().setAppName("task_bitmap_app")//.setMaster("local[*]")
    val context = new SparkContext(sparkConf)
   // val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()


    val taskId: String = args(0)
    val busiDate: String = args(1)
    val tagInfoList: List[TagInfo] = TagInfoDAO.getTagInfoListWithOn()
    //把标签按照 标签值类型分成四份 ，分别插入到不同表中
    val tagInfoStringList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]()
    val tagInfoLongList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]()
    val tagInfoDecimalList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]()
    val tagInfoDateList: ListBuffer[TagInfo] = new ListBuffer[TagInfo]()
    for (tagInfo <- tagInfoList) {
      if (tagInfo.tagValueType == CodeConst.TAG_VALUE_TYPE_STRING) {
        tagInfoStringList.append(tagInfo)
      } else if (tagInfo.tagValueType == CodeConst.TAG_VALUE_TYPE_LONG) {
        tagInfoLongList.append(tagInfo)
      } else if (tagInfo.tagValueType == CodeConst.TAG_VALUE_TYPE_DECIMAL) {
        tagInfoDecimalList.append(tagInfo)
      } else if (tagInfo.tagValueType == CodeConst.TAG_VALUE_TYPE_DATE) {
        tagInfoDateList.append(tagInfo)
      }

    }
    insertBitmapTableByValueType("user_tag_value_string", busiDate, tagInfoStringList);
    insertBitmapTableByValueType("user_tag_value_long", busiDate, tagInfoLongList);
    insertBitmapTableByValueType("user_tag_value_decimal", busiDate, tagInfoDecimalList);
    insertBitmapTableByValueType("user_tag_value_date", busiDate, tagInfoDateList);


  }


  def insertBitmapTableByValueType(targetTable: String, busiDate: String, tagInfoList: ListBuffer[TagInfo]): Unit = {
    if (tagInfoList.size != 0) {

      //保证幂等
      val deleteSQL=s" alter table $targetTable delete where dt='$busiDate'"
      println(deleteSQL)
      ClickhouseUtil.executeSql(deleteSQL)

      val tagCodeSQL = tagInfoList.map(tagInfo => s"('${tagInfo.tagCode.toLowerCase}',${tagInfo.tagCode.toLowerCase})").mkString(",")

      val busiDateNew = busiDate.replace("-", "")

      val insertSQL =
        s"""
           | insert into $targetTable
           | select   tg.1 as tag_code, if(tg.2='','0',tg.2) as tag_value  , groupBitmapState(cast (uid as UInt64)) ,'$busiDateNew'
           | from
           | (
           | select uid, arrayJoin([$tagCodeSQL] ) tg
           | from user_tag_merge_$busiDateNew
           | ) tt
           | group by tg.1 ,tg.2
           |
           |
           |""".stripMargin

      println(insertSQL)
      ClickhouseUtil.executeSql(insertSQL)

    }

  }


}
